package com.atguigu.flink.state.operatorstate;

import com.nimbusds.jwt.EncryptedJWT;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by Smexy on 2023/11/15

 把收到的单词进行拼接之后再输出。
 */
public class Demo1_ListState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
            更改重启策略，默认是不重启

             fixedDelayRestart(
            int restartAttempts,  尝试重启的最大次数
            Time delayInterval   多次尝试重启之间的时间间隔
            )
         */
        //env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(2)));

        //每2s备份一次状态，当开启了checkpoint时，默认就是无限重启
        env.enableCheckpointing(2000);
        env.setParallelism(2);
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new MyMapFunction())
                    .addSink(new SinkFunction<String>()
                    {
                        @Override
                        public void invoke(String value, Context context) throws Exception {
                            if (value.contains("x")){
                                //手动模拟故障
                                throw new RuntimeException("出故障了...");
                            }
                            System.out.println(value);
                        }
                    });

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }

    // 把收到的单词进行累积拼接之后再输出。
    private static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction
    {

        //Managed 状态。 flink帮你自动备份，及重启时自动恢复。用起来和list一模一样
        private ListState<String> result ;  //1
        private ListState<Integer> result1 ;
        private ListState<String> result2 ;
        private ListState<String> result3 ;
        @Override
        public String map(String value) throws Exception {
            //写状态
            result.add(value);
            //读状态
            return result.get().toString();
        }

        //周期（参考ck的配置）性备份状态。 备份是自动的，无需写任何内容
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("MyMapFunction.snapshotState");
        }

        //当Task启动或重启时，MyMapFunction对象就被创建，对象的属性result需要从之前备份的状态中恢复
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("MyMapFunction.initializeState");

            //获取到状态的备份
            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
            //创建状态的描述
            ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("1", String.class);
            //从备份中根据描述获取之前备份的状态。第一次启动，此时会创建一个[]赋值给result
            result = operatorStateStore.getListState(listStateDescriptor);
        }
    }
}
